package com.gemini.main.jraft.counter;

import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Utils;
import com.gemini.main.jraft.counter.snapshot.CounterSnapshotFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

/**
 * gemini
 * com.gemini.main.jraft.counter.CounterStateMachine
 *
 * @author zhanghailin
 */
public class CounterStateMachine extends StateMachineAdapter {

    private static final Logger LOG = LoggerFactory.getLogger(CounterStateMachine.class);

    /**
     * Counter value
     */
    private final AtomicLong value = new AtomicLong(0);
    /**
     * Leader term
     */
    private final AtomicLong leaderTerm = new AtomicLong(-1);

    public boolean isLeader() {
        return this.leaderTerm.get() > 0;
    }

    /**
     * Returns current value.
     */
    public long getValue() {
        return this.value.get();
    }

    /**
     * 状态机关键接口
     * 提交的 task ，在 jraft 内部会做累积批量提交，应用到状态机的是一个 task 迭代器，通过 com.alipay.sofa.jraft.Iterator 接口表示
     *
     * @param iterator
     */
    @Override
    public void onApply(Iterator iterator) {
        while (iterator.hasNext()) {
            long current = 0;
            CounterOperation counterOperation = null;
            CounterClosure closure = null;
            //这里有一个优化技巧，通常 leader 获取到的 done closure，可以扩展包装一个 closure 类 包含了没有序列化的用户请求，
            //那么在逻辑处理部分可以直接从 closure 获取到用户请求，无需通过 data 反序列化得到，减少了 leader 的 CPU 开销
            if (iterator.done() != null) {
                // This task is applied by this node, get value from closure to avoid additional parsing.
                closure = (CounterClosure) iterator.done();
                counterOperation = closure.getCounterOperation();
            } else {
                final ByteBuffer data = iterator.getData(); // 获取当前任务数据
                try {
                    counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
                            data.array(), CounterOperation.class.getName()); // 反序列化
                } catch (CodecException e) {
                    LOG.error("Fail to decode IncrementAndGetRequest", e);
                }
            }
            // 请求类型判断
            if (counterOperation != null) {
                switch (counterOperation.getOp()) {
                    case CounterOperation.GET:
                        current = this.value.get();
                        LOG.info("Get value={} at logIndex={}", current, iterator.getIndex());
                        break;
                    case CounterOperation.INCREMENT:
                        final long delta = counterOperation.getDelta();
                        final long prev = this.value.get();
                        current = value.addAndGet(delta);
                        LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iterator.getIndex());
                        break;
                }

                if (closure != null) {
                    closure.success(current);
                    closure.run(Status.OK());
                }
            }
            // 移到下一个task
            iterator.next();
        }
    }

    @Override
    public void onSnapshotSave(SnapshotWriter writer, Closure done) {
        final long currVal = this.value.get();
        Utils.runInThread(() -> {
            final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
            if (snapshot.save(currVal)) {
                if (writer.addFile("data")) {
                    done.run(Status.OK());
                } else {
                    done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
                }
            } else {
                done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath()));
            }
        });
    }

    @Override
    public void onError(final RaftException e) {
        LOG.error("Raft error: %s", e, e);
    }

    @Override
    public boolean onSnapshotLoad(SnapshotReader reader) {
        if (isLeader()) {
            LOG.warn("Leader is not supposed to load snapshot");
            return false;
        }
        if (reader.getFileMeta("data") == null) {
            LOG.error("Fail to find data file in {}", reader.getPath());
            return false;
        }
        final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
        try {
            this.value.set(snapshot.load());
            return true;
        } catch (final IOException e) {
            LOG.error("Fail to load snapshot from {}", snapshot.getPath());
            return false;
        }

    }

    @Override
    public void onLeaderStart(final long term) {
        this.leaderTerm.set(term);
        super.onLeaderStart(term);

    }

    @Override
    public void onLeaderStop(final Status status) {
        this.leaderTerm.set(-1);
        super.onLeaderStop(status);
    }
}
